package com.atguigu.app.dwd.log;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

//数据流：web/app -> Nginx -> 日志服务器(log) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序：  Mock -> Flume(f1.sh) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)
public class BaseLogApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  //生产环境设置为Kafka的分区数

        //1.1 设置状态后端
        //System.setProperty("HADOOP_USER_NAME", "atguigu");
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall-flink/ck");

        //1.2 开启CK
        //env.enableCheckpointing(3 * 60000L);
        //env.getCheckpointConfig().setCheckpointTimeout(5 * 60000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //TODO 2.读取Kafka topic_log 主题数据创建流
        String topic = "topic_log";
        String groupId = "base_log_app_211227";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.过滤并转换为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    System.out.println("Dirty>>>>" + value);
                }
            }
        });

        //TODO 4.按照Mid进行分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        //TODO 5.使用状态编程实现新老访客校验
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() {

            private ValueState<String> lastVisitDtState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastVisitDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));
            }

            @Override
            public JSONObject map(JSONObject value) throws Exception {

                //获取状态中的日期数据
                String lastVisitDt = lastVisitDtState.value();
                //获取当前数据中的日期
                Long ts = value.getLong("ts");
                String curDt = DateFormatUtil.toDate(ts);
                //获取当前数据中的"is_new"标记
                String isNew = value.getJSONObject("common").getString("is_new");

                //判断为"1"
                if ("1".equals(isNew)) {

                    //如果状态为null,则需要更新状态,但是数据不变
                    if (lastVisitDt == null) {
                        lastVisitDtState.update(curDt);
                        //状态不为null,但是状态中的日期与当前数据日期不同,则更新数据"1"为"0"
                    } else if (!lastVisitDt.equals(curDt)) {
                        value.getJSONObject("common").put("is_new", "0");
                    }
                    //判断为"0",且状态为null,则更新状态为当期日期的昨天
                } else if (lastVisitDt == null) {
                    lastVisitDtState.update(DateFormatUtil.toDate(ts - 24 * 60 * 60 * 1000L));
                }

                return value;
            }
        });

        //TODO 6.使用侧输出流进行分流处理  页面写入主流  启动、曝光、动作、错误写入侧输出流
        OutputTag<String> startOutPutTag = new OutputTag<String>("start") {
        };
        OutputTag<String> displayOutPutTag = new OutputTag<String>("display") {
        };
        OutputTag<String> errorOutPutTag = new OutputTag<String>("error") {
        };
        OutputTag<String> actionOutPutTag = new OutputTag<String>("action") {
        };
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {

                //判断是否为错误日志
                String err = value.getString("err");
                if (err != null) {
                    ctx.output(errorOutPutTag, value.toJSONString());
                }

                //尝试获取启动数据
                String start = value.getString("start");
                if (start != null) {
                    value.remove("err");
                    ctx.output(startOutPutTag, value.toJSONString());
                } else {
                    //页面日志,提取页面信息、公共信息以及时间戳
                    JSONObject page = value.getJSONObject("page");
                    JSONObject common = value.getJSONObject("common");
                    Long ts = value.getLong("ts");

                    //尝试获取曝光数据
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null) {
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject displayJson = displays.getJSONObject(i);
                            displayJson.put("common", common);
                            displayJson.put("page", page);
                            displayJson.put("ts", ts);
                            ctx.output(displayOutPutTag, displayJson.toJSONString());
                        }
                    }

                    //尝试获取动作数据
                    JSONArray actions = value.getJSONArray("actions");
                    if (actions != null) {
                        for (int i = 0; i < actions.size(); i++) {
                            JSONObject actionJson = actions.getJSONObject(i);
                            actionJson.put("common", common);
                            actionJson.put("page", page);

                            ctx.output(actionOutPutTag, actionJson.toJSONString());
                        }
                    }

                    //移除曝光、动作、错误数据
                    value.remove("displays");
                    value.remove("actions");
                    value.remove("err");

                    out.collect(value.toJSONString());
                }
            }
        });

        //TODO 7.获取所有的流
        DataStream<String> startDS = pageDS.getSideOutput(startOutPutTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayOutPutTag);
        DataStream<String> actionDS = pageDS.getSideOutput(actionOutPutTag);
        DataStream<String> errorDS = pageDS.getSideOutput(errorOutPutTag);

        pageDS.print("page>>>>>>>>>>");
        startDS.print("startDS>>>>>>>>>>");
        displayDS.print("displayDS>>>>>>>>>>");
        actionDS.print("actionDS>>>>>>>>>>");
        errorDS.print("errorDS>>>>>>>>>>");

        //TODO 8.将数据写出到对应的Kafka主题
        String page_topic = "dwd_traffic_page_log";
        String start_topic = "dwd_traffic_start_log";
        String display_topic = "dwd_traffic_display_log";
        String action_topic = "dwd_traffic_action_log";
        String error_topic = "dwd_traffic_error_log";

        pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));
        startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));
        displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));
        actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));
        errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));

        //TODO 9.启动任务
        env.execute("BaseLogApp");

    }

}
